package org.databandtech.mockmq;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import org.databandtech.common.Mock;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

public class RabbitMQProducer {
	
	static final String USERNAME = "guest";
	static final String PASSWORD = "guest";
	static final String VIRTUALHOST = "/";
	static final String HOSTNAME = "localhost";
	static final String EXCHANGENAME = "my-exchange";
	final static String QUEUE_NAME="rabbitMQ_QUEUE";
	static final String ROUTINGKEY = "my-key";
	static final String EXPIRATION = "60000";
	static final int COUNT = 100;	
	static final int PORT = 5672;//5672 for regular connections, 5671 for connections that use TLS

	public static void main(String[] args) {

		ConnectionFactory factory = new ConnectionFactory();
		// "guest"/"guest" by default, limited to localhost connections
		factory.setUsername(USERNAME);
		factory.setPassword(PASSWORD);
		factory.setVirtualHost(VIRTUALHOST);
		factory.setHost(HOSTNAME);
		//factory.setPort(PORT);

		try {
			Connection conn = factory.newConnection();
			//获得信道
	        Channel channel = conn.createChannel();
	        // 声明一个队列
	        //channel.queueDeclare(QUEUE_NAME, false, false, false, null);
	        //声明交换器
			AMQP.Exchange.DeclareOk ok = channel.exchangeDeclare(EXCHANGENAME,BuiltinExchangeType.DIRECT,true);
		    System.out.println(ok);
			String queueName = channel.queueDeclare().getQueue();
			channel.queueBind(queueName, EXCHANGENAME, ROUTINGKEY);
			System.out.println("queueName: " + queueName);
					        
			for(int i=0;i<COUNT;i++) {
		        //发布消息
				String msg = org.databandtech.common.Mock.getChineseName()+","+Mock.getDate()+","+
						Mock.getEmail(4, 6)+","+Mock.getRoad()+","+Mock.getNumString(90)+","+
						Mock.getNumString(50)+","+Mock.getNumString(300);
		        byte[] messageBodyBytes = msg.getBytes();
		        
		        push(channel, messageBodyBytes,true);
			}
	        //  publishes a message with expiration
	        //pushWithExpiration(channel, messageBodyBytes,EXPIRATION);
	        
	        //  publishes a message with custom headers
	        //pushWithHead(channel, messageBodyBytes,null);
	       
	        channel.close();
	        conn.close();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (TimeoutException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	private static void push(Channel channel, byte[] messageBodyBytes,boolean isUseExchange) throws IOException {
		
		if (isUseExchange) {
			channel.basicPublish(EXCHANGENAME, ROUTINGKEY, null, messageBodyBytes);
		}else{
			channel.basicPublish("", QUEUE_NAME, null, messageBodyBytes);
		}
	}

	//消息允许过期
	private static void pushWithExpiration(Channel channel, byte[] messageBodyBytes,String expiration) throws IOException {
		channel.basicPublish(EXCHANGENAME, ROUTINGKEY,
			             new AMQP.BasicProperties.Builder()
			               .expiration(expiration)
			               .build(),
			               messageBodyBytes);
	}
	
	//消息允许过期
	private static void pushWithHead(Channel channel, byte[] messageBodyBytes,Map<String, Object> header) throws IOException {
		
		Map<String, Object> headers = new HashMap<String, Object>();
		headers.put("latitude",  51.5252949);
		headers.put("longitude", -0.0905493);
		
		channel.basicPublish(EXCHANGENAME, ROUTINGKEY,
			             new AMQP.BasicProperties.Builder()
			               .headers(headers)
			               .build(),
			               messageBodyBytes);
	}
	

}
